package com.atguigu.flink.watermark;

import com.atguigu.flink.function.WaterSensorMapFunction;
import com.atguigu.flink.pojo.WaterSensor;
import com.atguigu.flink.utils.MyUtil;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

import java.time.Duration;

/**
 * Created by Smexy on 2023/11/15

    水印作用是触发基于EventTime语义下时间窗口的计算。
        针对的对象是窗口，窗口关闭之后到达的数据称为迟到的数据。以此作为定义。
        针对的对象是算子的时钟(水印)，数据的时间属性 < 算子的水印就是迟到。

    1.延迟算子的时钟(推迟水印)
        延迟之前，窗口是4999就触发运算
        延迟水印3000，窗口是延迟前时间的7999触发，是延迟后时间的4999触发运算

    2.推迟窗口的关闭时间，让窗口到点就触发计算，但是不要关。
        迟到的数据依旧有机会进入窗口

    3.操作的还是窗口，把无法进入窗口的数据，使用侧流接收。
        后续再伺机处理
 */
public class Demo3_HandleLate
{
    public static void main(String[] args) {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);

        WatermarkStrategy<WaterSensor> watermarkStrategy = WatermarkStrategy
            //推迟水印 当前水印: Task收到的最大事件时间 - 3s - 1ms
            .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
            .withTimestampAssigner( (e, ts) -> e.getTs());

        OutputTag<WaterSensor> outputTag = new OutputTag<>("late", TypeInformation.of(WaterSensor.class));
        SingleOutputStreamOperator<String> mainDs = env
            .socketTextStream("hadoop102", 8888)
            .map(new WaterSensorMapFunction())
            .assignTimestampsAndWatermarks(watermarkStrategy)
            /*
              基于事件时间的窗口计算。此时 水印就是时间
                 5s滚动的时间窗口:
                     第一个窗口: [0,5000) 等价于 [0,4999]。 水印到达4999就计算。
                     第二个窗口: [5000,9999]。  水印到达9999就计算。
                     ....
             */
            .windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
            //窗口到点触发了计算后，推迟3s再关闭 第一个窗口: [0,4999]，会在4999触发计算，但是不关，会在7999再关闭。
            //数据只要在7999之前能到，就能进入窗口，没进入一次窗口，窗口就触发一次运算
            .allowedLateness(Time.seconds(3))
            //窗口关闭后的数据，使用侧流接收
            .sideOutputLateData(outputTag)
            .process(new ProcessAllWindowFunction<WaterSensor, String, TimeWindow>()
            {
                @Override
                public void process(Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
                    TimeWindow window = context.window();
                    out.collect(window + ":" + MyUtil.parseToList(elements));
                }
            });

            mainDs.print();

            //获取侧流
            mainDs.getSideOutput(outputTag).printToErr("迟到");



        try {
            env.execute();
        } catch (Exception e) {
            e.printStackTrace();
        }

    }
}
